package com.xiang.ad.mysql.listener;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.xiang.ad.mysql.TemplateHolder;
import com.xiang.ad.mysql.dto.BinlogRowData;
import com.xiang.ad.mysql.dto.TableTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 *  =============插入================
 *  当我给ad_unit_keyword插入了一行数据后
 *  Write---------------
 *  WriteRowsEventData{tableId=94, includedColumns={0, 1, 2}, rows=[
 *  [1, 10, 测试用]
 *  ]}
 *  =============更新================
 *  Update--------------
 *  UpdateRowsEventData{tableId=94, includedColumnsBeforeUpdate={0, 1, 2}, includedColumns={0, 1, 2}, rows=[
 *  {before=[1, 10, 测试用], after=[1, 11, 测试用]}
 *  ]}
 *  =============删除================
 *  Delete--------------
 *  DeleteRowsEventData{tableId=94, includedColumns={0, 1, 2}, rows=[
 *  [1, 11, 测试用]
 *  ]}
 */


/**
 * Created by xiang.
 * 监听并解析binlog
 * 1、getAfterValues:根据不同的操作类型，获取到对应的rows。rows部分就是binlog中记录的数据信息
 * 2、对获取到的信息内容list实现遍历，对应 列和值 关联起来，构成afterMapList
 * 3、设置binlogRowData的after字段以及table字段，最后返回
 * 4、构成好的binlogRowData与onevent里面的rowData实现结合，最后传递给listener
 * 5、这样就实现了 将binlog  data转换成想要的 binlogRowData（我自己定义的格式，方便后面服务用）
 * 6、
 */
@Slf4j
@Component  //需要实现BinaryLogClient.EventListener才能实现监听
public class AggregationListener implements BinaryLogClient.EventListener {

    private String dbName;//数据库的名字
    private String tableName;//表的名字

    //监听器的map  不同的表需要不同的监听器
    private Map<String, Ilistener> listenerMap = new HashMap<>();

    //用来 加载并解析模板文件的 类
    private final TemplateHolder templateHolder;

    @Autowired
    public AggregationListener(TemplateHolder templateHolder) {
        this.templateHolder = templateHolder;
    }

    //构造一个生成key的方法，生成唯一的标识（数据库+表）
    private String genKey(String dbName, String tableName) {
        return dbName + ":" + tableName;
    }

    //对外部提供一个注册监听器的方法
    public void register(String _dbName, String _tableName,
                         Ilistener ilistener) {
        log.info("register : {}-{}", _dbName, _tableName);//打印日志，标识对哪个数据库的哪个表进行注册监听器了
        this.listenerMap.put(genKey(_dbName, _tableName), ilistener);//注册 key-listener
    }

    //对event进行解析   将event解析成自己定义的BinlogRowData，将BinlogRowData传递给监听器，实现增量数据的更新
    @Override
    public void onEvent(Event event) {

        //
        EventType type = event.getHeader().getEventType();//header中有操作event类型
        log.debug("event type: {}", type);//并不是所有的event type都要处理

        //如果EventType是TABLE_MAP  TABLE_MAP中会记录当前数据库的名字和当前表的名字
        if (type == EventType.TABLE_MAP) {
            TableMapEventData data = event.getData();
            this.tableName = data.getTable();//数据库名
            this.dbName = data.getDatabase();//数据表名
            return;
        }

        //update、write、delete 不是这三种类型 就不处理
        if (       type != EventType.EXT_UPDATE_ROWS //EventType是根据版本 在官网看的，每个版本不一样，我这里是mysql8.0的版本
                && type != EventType.EXT_WRITE_ROWS
                && type != EventType.EXT_DELETE_ROWS) {
            return;
        }

        // 判断 表名和库名是否已经完成填充，如果没有完成填充，说明这是一个错误的binlog事件，不予处理
        if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tableName)) {
            log.error("no meta data event");//错误的binlog事件，或者代码出了问题
            return;
        }

        // 这里只会处理 template.json 中定义的那些表，其他的还没有设置，不会去处理
        // 只处理我们业务逻辑上 关心的这些表，对其注册监听器，其他的都不关心
        // 找出对应表有兴趣的监听器  （兴趣就是 就是已经注册过对应监听器的）
        String key = genKey(this.dbName, this.tableName);//构造key
        Ilistener listener = this.listenerMap.get(key);//尝试获取listener
        if (null == listener) {
            log.debug("skip {}", key);//打印 跳过debug日志
            return;
        }

        log.info("trigger event: {}", type.name());//打印触发事件的名字

        //尝试处理这条binlog
        try {
            //将eventData转化为想要的BinlogRowData，之后就可以将rowData传递给感兴趣的监听器去处理了
            BinlogRowData rowData = buildRowData(event.getData());
            if (rowData == null) {
                return;
            }

            rowData.setEventType(type);
            listener.onEvent(rowData);//将rowData传递给感兴趣的监听器去处理

        } catch (Exception ex) {//catch异常
            ex.printStackTrace();  //打印异常堆栈
            log.error(ex.getMessage()); //打印error
        } finally {
            //处理这个事件的时候，把dbName和tableName都已经记录下来了
            //处理完一条事件后，需要清空 这些
            this.dbName = "";
            this.tableName = "";
        }
    }

    /**
     * 统一三类eventType
     * 根据不同的类型，获取到统一格式的数据，比较好处理
     * 因为三种event打印出来的信息是有差别的,删除和写入没有 before和after这俩概念
     * 假设删除和写入的before和after是空（想象删除和写入是特殊的更新）
     */
    private List<Serializable[]> getAfterValues(EventData eventData) {

        //写入
        if (eventData instanceof WriteRowsEventData) {
            return ((WriteRowsEventData) eventData).getRows();//getRows就代表插入的最新数据
        }

        //更新
        if (eventData instanceof UpdateRowsEventData) {
            return ((UpdateRowsEventData) eventData).getRows().stream()
                    .map(Map.Entry::getValue)//获取value部分，得到的after（key是before）
                    .collect(Collectors.toList());
        }

        //删除
        if (eventData instanceof DeleteRowsEventData) {
            return ((DeleteRowsEventData) eventData).getRows();
        }

        return Collections.emptyList();//返回空list表示不去处理
    }

    //解析eventData，将eventData转化为想要的BinlogRowData
    private BinlogRowData buildRowData(EventData eventData) {

        //获取对应的template信息
        TableTemplate table = templateHolder.getTable(tableName);

        if (null == table) {//获取不到就是出错了，打印日志方便找错
            log.warn("table {} not found", tableName);
            return null;
        }

        //用于填充binlogRowdata中的after，before不关心，因为关心的是修改后的数据
        List<Map<String, String>> afterMapList = new ArrayList<>();

        for (Serializable[] after : getAfterValues(eventData)) {

            //key：value    column：值
            Map<String, String> afterMap = new HashMap<>();

            int colLen = after.length;

            for (int ix = 0; ix < colLen; ++ix) {

                // 取出当前位置对应的列名
                String colName = table.getPosMap().get(ix);

                // 如果没有则说明不关心这个列
                if (null == colName) {
                    log.debug("ignore position: {}", ix);
                    continue;
                }
                //获取到对应列的值
                String colValue = after[ix].toString();
                afterMap.put(colName, colValue);
            }

            afterMapList.add(afterMap);//填充
        }

        //转换完毕了，这里填充BinlogRowData，实现了 解析eventData并转化为BinlogRowData
        BinlogRowData rowData = new BinlogRowData();
        rowData.setAfter(afterMapList);
        rowData.setTable(table);

        return rowData;
    }
}
